{#
  Template for 'request_codec_request_test.cc'.

  Provides integration tests using Kafka codec.
  The tests do the following:
  - create the message,
  - serialize the message into buffer,
  - pass the buffer to the codec,
  - capture messages received in callback,
  - verify that captured messages are identical to the ones sent.
#}
#include "contrib/kafka/filters/network/source/external/requests.h"
#include "contrib/kafka/filters/network/source/request_codec.h"

#include "contrib/kafka/filters/network/test/buffer_based_test.h"
#include "contrib/kafka/filters/network/test/serialization_utilities.h"

#include "gtest/gtest.h"

namespace Envoy {
namespace Extensions {
namespace NetworkFilters {
namespace Kafka {
namespace RequestCodecRequestTest {

class RequestCodecRequestTest : public testing::Test, public MessageBasedTest<RequestEncoder> {};

using RequestCapturingCallback = CapturingCallback<RequestCallback, AbstractRequestSharedPtr,
  RequestParseFailureSharedPtr>;

{% for message_type in message_types %}

// Integration test for {{ message_type.name }} messages.

TEST_F(RequestCodecRequestTest, ShouldHandle{{ message_type.name }}Messages) {
  // given
  using RequestUnderTest = Request<{{ message_type.name }}>;

  std::vector<RequestUnderTest> sent;
  int32_t correlation = 0;

  {% for field_list in message_type.compute_field_lists() %}
  for (int i = 0; i < 100; ++i ) {
    {# Request header cannot contain tagged fields if request does not support them. #}
    const TaggedFields tagged_fields = requestUsesTaggedFieldsInHeader(
      {{ message_type.get_extra('api_key') }}, {{ field_list.version }}) ?
        TaggedFields{ { TaggedField{ 10, Bytes{1, 2, 3, 4} } } }:
        TaggedFields({});
    const RequestHeader header = {
      {{ message_type.get_extra('api_key') }},
      {{ field_list.version }},
      correlation++,
      "id",
      tagged_fields
    };
    const {{ message_type.name }} data = { {{ field_list.example_value() }} };
    const RequestUnderTest request = {header, data};
    putMessageIntoBuffer(request);
    sent.push_back(request);
  }
  {% endfor %}

  const InitialParserFactory& initial_parser_factory = InitialParserFactory::getDefaultInstance();
  const RequestParserResolver& request_parser_resolver =
    RequestParserResolver::getDefaultInstance();
  const auto callback = std::make_shared<RequestCapturingCallback>();

  RequestDecoder testee{initial_parser_factory, request_parser_resolver, {callback}};

  // when
  testee.onData(buffer_);

  // then
  const std::vector<AbstractRequestSharedPtr>& received = callback->getCapturedMessages();
  ASSERT_EQ(received.size(), sent.size());
  ASSERT_EQ(received.size(), correlation);

  for (size_t i = 0; i < received.size(); ++i) {
    const std::shared_ptr<RequestUnderTest> request =
      std::dynamic_pointer_cast<RequestUnderTest>(received[i]);
    ASSERT_NE(request, nullptr);
    ASSERT_EQ(*request, sent[i]);
  }
}
{% endfor %}

} // namespace RequestCodecRequestTest
} // namespace Kafka
} // namespace NetworkFilters
} // namespace Extensions
} // namespace Envoy
